-
Notifications
You must be signed in to change notification settings - Fork 139
feat: add CheckpointLogReplayProcessor in new checkpoints mod
#744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #744 +/- ##
==========================================
+ Coverage 85.03% 85.06% +0.02%
==========================================
Files 84 84
Lines 20656 20802 +146
Branches 20656 20802 +146
==========================================
+ Hits 17565 17695 +130
- Misses 2226 2229 +3
- Partials 865 878 +13 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
V1CheckpointLogReplayScanner in new checkpoints modCheckpointLogReplayScanner in new checkpoints mod
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing a few comments. can we (1) describe the overall requirements/flow and (2) if we decide to refactor, have a quick design on the refactor and what it enables and then open a separate PR
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropped a few comments, general direction looks good. let's split out the new LogReplayProcessor trait into a refactor PR first (and include some description for the motivation - could also just do a new PR that links here to show how it's used)
| // Remove action for file1 with a different deletion vector | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // Remove action for file1 with another different deletion vector | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"THREE","pathOrInlineDv":"dv3","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this single-batch from a single-commit file test case as it's not a valid scenario:
"The intersection of the primary keys in the add collection and remove collection must be empty."
Instead we test repeating primary keys spanning across multiple commit files below in test_checkpoint_actions_iter_file_actions_with_deletion_vectors
|
|
||
| // TODO: Teach expression eval to respect the selection vector we just computed so carefully! | ||
| let result = self.add_transform.evaluate(actions_batch)?; | ||
| let result = self.add_transform.evaluate(actions_batch.as_ref())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This evaluation returns a Box<EngineData> (mentioned in reasoning for updating parameter type for process_actions_batch in other comment)
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still need to review tests but looks good!
nicklan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, with a nit about not panicing
| /// * `i` - Index position in the data structure to examine | ||
| /// * `getters` - Collection of data getter implementations used to access the data | ||
| /// * `skip_removes` - Whether to skip remove actions when extracting file actions | ||
| /// # Parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor formatting changes
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a few nits + ask on testing - if we need more tests I think fine to unblock this and open a follow-up
| // Duplicates that should be skipped | ||
| r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#, | ||
| r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check the latest is returned by making this version different than above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right below this line is another txn action with a unique appId which covers the scenario you mentioned
| assert_eq!(add_actions, 2); | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are tests exhaustive? let's not block this PR but perhaps a follow up if we need to cover:
- remove tombstones before/after expiration
- we skip CommitInfo, CDC, CheckpointMetadata, Sidecar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both of your mentioned scenarios are sufficiently tested in the existing visitor level unit tests:
test_checkpoint_visitor_boundary_cases_for_tombstone_expirationtest_checkpoint_visitor– this test already verifies the exclusion of CommitInfo, cdc, and sidecar actions. (I'll update this test in this PR to also include the checkpointMetadata action in the input batch.)
I'm not sure it's worthwhile to add additional tests specifically for cross-batch behavior, but happy to revisit if you think there's value in doing so, let me know!
|
@sebastiantia this isn't a breaking change right? let's remove label if it was accidentally/incorrectly added before |
What changes are proposed in this pull request?
resolves #743
When the checkpoint API is called, we need to return all actions to be included in the checkpoint file for the engine to write. The returned actions will be
EngineDatabatches, where each batch has an accompanying selection vector which informs the engine of which actions in the batch to write/not write to the checkpoint file.To generate this filtered actions iterator:
This PR introduces the
CheckpointLogReplayProcessorwhich implements theLogReplayProcessortrait. This processor is applied to an iterator of action batches read from the log segment with the newcheckpoint_actions_itermethod, in order to transform the input actions iterator into an iterator ofFilteredEngineData, which includes the log data accompanied with a selection vector indicating which rows should be included in the checkpoint file.How was this change tested?
These tests test the application of the
CheckpointVisitorover multiple batches. The visiting of individual batches is already tested.test_checkpoint_actions_iter_non_file_actionstest_checkpoint_actions_iter_file_actionstest_checkpoint_actions_iter_file_actions_with_deletion_vectors